Skip to content

Parallelizes Data.content_hash() for large datasets#86

Open
JyotinderSingh wants to merge 2 commits intomainfrom
parallel-hash
Open

Parallelizes Data.content_hash() for large datasets#86
JyotinderSingh wants to merge 2 commits intomainfrom
parallel-hash

Conversation

@JyotinderSingh
Copy link
Collaborator

@JyotinderSingh JyotinderSingh commented Mar 10, 2026

Problem

Data.content_hash() hashes all file contents sequentially in a single thread. For large datasets (e.g. 20 GB across hundreds of thousands of files), this becomes the bottleneck before GCS upload as the method spends most of its time
blocked on I/O reads that could be issued concurrently.

Solution

This change introduces a two-level parallel hashing algorithm:

  1. We first use os.walk to collect all file paths, sorted once at the end for determinism.
  2. Each file is hashed independently (SHA-256(relpath + \0 + contents)) via a ThreadPoolExecutor. Work is submitted in batches of 512 to avoid creating one Future per file (which would mean 1M Future objects for a directory with a million files).
  3. Per-file digests (fixed 32 bytes each) are fed in sorted order into a final SHA-256("dir:" + digest_1 + digest_2 + ...).

The GIL is released during open(), read(), and hashlib.update() (for inputs > 2048 bytes), so threads achieve real I/O parallelism and saturate SSD/NVMe queue depth across multiple NAND channels.

Design decisions

Decision Rationale
ThreadPoolExecutor over multiprocessing I/O-bound workload; GIL released during file I/O and hashing. Avoids process creation and serialization overhead.
Batch size of 512 Balances thread scheduling overhead vs. parallelism. Reduces Future count from 1M to ~2K for million-file datasets.
Threshold of 16 files Small directories skip thread pool creation entirely — avoids overhead for the common case in tests and small datasets.
Worker count min(32, cpu_count + 4) Python stdlib default for I/O-bound work. Capped at 32 to limit file descriptor usage (reference)
Two-level hash (per-file then combine) Enables independent parallel hashing while maintaining determinism.

Breaking change

This changes the hash algorithm from single-pass incremental to two-level, so existing hash values will differ. This is acceptable.

Benchmarking

I used a one-off benchmarking script to understand the performance characteristics of the multi-threaded approach. The following numbers were captured on an 2021 Macbook Pro with an M1 Pro processor.

System: 10 CPUs

============================================================
  10 files x 50.0 MB each = 500.0 MB total
  CPUs: 10, runs: 3 (median reported)
============================================================
  Creating dataset... done (0.3s)
  Hashing... 0.260s  (1925 MB/s)  hash=9bc75b07a5a2

============================================================
  1,000 files x 512.0 KB each = 500.0 MB total
  CPUs: 10, runs: 3 (median reported)
============================================================
  Creating dataset... done (0.5s)
  Hashing... 0.157s  (3194 MB/s)  hash=6aac431bfe51

============================================================
  50,000 files x 10.2 KB each = 500.0 MB total
  CPUs: 10, runs: 3 (median reported)
============================================================
  Creating dataset... done (5.0s)
  Hashing... 3.844s  (130 MB/s)  hash=e0e54a4edee6

============================================================
  200,000 files x 2.6 KB each = 500.0 MB total
  CPUs: 10, runs: 3 (median reported)
============================================================
  Creating dataset... done (20.7s)
  Hashing... 12.507s  (40 MB/s)  hash=4bb6b3f1a45c

============================================================
  1,000,000 files x 20.5 KB each = 19.5 GB total
  CPUs: 10, runs: 3 (median reported)
============================================================
  Creating dataset... done (126.7s)
  Hashing... 70.933s  (282 MB/s)  hash=078061de1d64


@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the performance of Data.content_hash() by introducing a parallel hashing mechanism. Previously, hashing large datasets was a bottleneck due to sequential I/O operations. The new approach employs a two-level parallel algorithm that processes files concurrently using a thread pool, specifically designed to optimize I/O-bound workloads. This change will drastically reduce the time required to compute content hashes for large directories, improving overall efficiency, though it does result in a change to the generated hash values.

Highlights

  • Parallel Hashing for Large Datasets: Implemented a two-level parallel hashing algorithm for Data.content_hash() to significantly improve performance when dealing with large datasets by concurrently processing file I/O.
  • Two-Level Hashing Algorithm: The new algorithm hashes each file independently (SHA-256 of relpath + \0 + contents), and then combines these per-file digests in sorted order into a final SHA-256 hash.
  • ThreadPoolExecutor for I/O-bound Workloads: Utilizes Python's ThreadPoolExecutor for parallel file hashing, leveraging the Global Interpreter Lock (GIL) release during I/O operations and hashlib.update() for real I/O parallelism.
  • Optimized Parallelism: Introduced batching of file hashing tasks (batch size 512) to reduce Future object overhead and a threshold (16 files) to avoid thread pool creation overhead for small directories.
  • Breaking Change: The underlying hash algorithm has changed, which means existing Data.content_hash() values will differ from those generated by this new implementation.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • keras_remote/data/data.py
    • Added ThreadPoolExecutor and partial imports.
    • Defined constants _PARALLEL_HASH_THRESHOLD and _HASH_BATCH_SIZE.
    • Introduced _hash_single_file and _hash_file_batch helper functions for file hashing.
    • Updated Data.content_hash docstring to reflect the new two-level hashing.
    • Refactored Data.content_hash to dispatch to _content_hash_file or _content_hash_dir.
    • Implemented _content_hash_file for single file hashing.
    • Implemented _content_hash_dir for parallel directory hashing using ThreadPoolExecutor with batching.
  • keras_remote/data/data_test.py
    • Imported _PARALLEL_HASH_THRESHOLD.
    • Added test_parallel_determinism_many_files to verify hash determinism for large directories.
    • Added test_parallel_threshold_boundary to test hashing behavior at the parallel threshold.
Activity
  • No human activity (comments, reviews) has been recorded on this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a well-designed parallel hashing algorithm to improve performance for large datasets, which is a great enhancement. The use of a two-level hash with a ThreadPoolExecutor is a solid approach for this I/O-bound task. The code is clear, and the addition of tests for determinism and boundary conditions is excellent. I have one minor suggestion to make the code slightly more concise and Pythonic.

Copy link
Member

@jeffcarp jeffcarp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a real dataset that's posing an issue prompting this optimization or is it a future theoretical issue?

I ask because I think orgs with sufficiently large datasets might have their own custom data ingestion setups.

h.update(relpath.encode("utf-8"))
h.update(b"\0")
with open(fpath, "rb") as f:
for chunk in iter(partial(f.read, 65536), b""):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment as to why it's chunked this way

Copy link
Collaborator Author

@JyotinderSingh JyotinderSingh Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just aligned with the default chunk size for python stdlib utilities. 64-256kb is also the standard nvme page size.

shutil.copyfileobj uses a 64kb chunk, while hashlib.file_digest uses a 256kb chunk

@JyotinderSingh
Copy link
Collaborator Author

JyotinderSingh commented Mar 10, 2026

Is there a real dataset that's posing an issue prompting this optimization or is it a future theoretical issue?

I ask because I think orgs with sufficiently large datasets might have their own custom data ingestion setups.

I just added this as a basic optimization to allow us to hash larger datasets more efficiently. I've added some benchmark numbers to the PR description for the same.

Copy link
Collaborator

@divyashreepathihalli divyashreepathihalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one comment

def _content_hash_dir(self) -> str:
# Enumerate all files. Walk in filesystem order (better disk
# locality) and sort once at the end for determinism.
file_list = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building a full file_list array covering every file in memory before chunking triggers the same RAM-saturation pattern found in other parts of the client/runner. For massive datasets (e.g., 20M+ files), iterating over the structure can cause an Out-Of-Memory (OOM) crash before the pool map even launches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants